-
Notifications
You must be signed in to change notification settings - Fork 805
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
buffer events when decision task is inflight #386
Conversation
c16de57
to
a44aeeb
Compare
a44aeeb
to
d1256b3
Compare
emptyEventID int64 = -23 | ||
firstEventID int64 = 1 | ||
emptyEventID int64 = -23 | ||
bufferedEventID int64 = -123 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just curious why choose number -123? (also why emptyEventID -23)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I just pick a random number that is unique and is less than 0.
d1256b3
to
abfe317
Compare
func (d *cassandraPersistence) updateBufferedEvents(batch *gocql.Batch, newBufferedEvents []*SerializedHistoryEventBatch, | ||
clearBufferedEvents bool, domainID, workflowID, runID string, condition int64, rangeID int64) { | ||
|
||
if clearBufferedEvents { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
newBufferedEvents and clearBufferedEvents should be mutually exclusive. Let's have some validation here to make sure only one is set and convert this piece of code into if-else block.
@@ -942,6 +964,14 @@ func (d *cassandraPersistence) GetWorkflowExecution(request *GetWorkflowExecutio | |||
} | |||
state.RequestCancelInfos = requestCancelInfos | |||
|
|||
eList := result["buffered_events_list"].([]map[string]interface{}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you refactor this logic in a helper method. For example look at 'createTimerTaskInfo'. We create a helper method to deserialize each of these custom types.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the logic is already put in a help function createSerializedHistoryEventBatch().
eventBatch := &SerializedHistoryEventBatch{EncodingType: common.EncodingTypeJSON} | ||
for k, v := range result { | ||
switch k { | ||
case "version": |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm fine with the default but why are you not reading the encoding_type?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since the value is already set to JSON when create the struct in this method, so there is no need to read the encoding_type, unless we support other encoding type.
service/history/historyEngine.go
Outdated
@@ -573,7 +573,7 @@ Update_History_Loop: | |||
var failCause workflow.DecisionTaskFailedCause | |||
var err error | |||
completedID := *completedEvent.EventId | |||
hasUnhandledEvents := ((completedID - startedID) > 1) | |||
hasUnhandledEvents := (msBuilder.GetNextEventID() - completedID) > 1 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Two things:
- We should first append all the decisions as part of decision task completed before flushing buffer to history events.
- Then this check could just be msBuilder.HasBufferedEvents.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will update
@@ -58,6 +58,10 @@ type ( | |||
updateRequestCancelInfos []*persistence.RequestCancelInfo // Modified RequestCancel Infos since last update | |||
deleteRequestCancelInfo *int64 // Deleted RequestCancel Info since last update | |||
|
|||
persistedBufferedEvents []*persistence.SerializedHistoryEventBatch // buffered history events that are already persisted |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use the same naming convention like for other mutable state items.
bufferedEvents and updateBufferedEvents
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will update
@@ -75,6 +79,8 @@ type ( | |||
updateChildExecutionInfos []*persistence.ChildExecutionInfo | |||
deleteChildExecutionInfo *int64 | |||
continueAsNew *persistence.CreateWorkflowExecutionRequest | |||
newBufferedEvents []*persistence.SerializedHistoryEventBatch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this needs to be an array? Every update should create a single batch of serialized event.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is to make unit test pass
// put new events into 2 buckets: | ||
// 1) if the event was added while there was in-flight decision, then put it in buffered bucket | ||
// 2) otherwise, put it in committed bucket | ||
var newBufferedEvents []*workflow.HistoryEvent |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this could be simplified as we can never have partial buffered or committed events in the same update.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but you still need to check if this is buffered or committed batch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried to put the events into buffered or committed based on first event's event id, but it would break lots of unit tests, mostly because we have the tests that addDecisionTaskStarted, then add some other events, then add decisionTaskCompleted. I would spend more time to update those tests, but i feel it is not worth it.
All other comments have been addressed.
c7f2ad7
to
f1f67f0
Compare
f1f67f0
to
1b66aa0
Compare
No description provided.